Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream response body in ASGITransport #3059

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

jhominal
Copy link
Member

@jhominal jhominal commented Jan 16, 2024

Summary

As part of my job, we needed a variant of ASGITransport that supports streaming (as in #2186), and this is my PR to implement that.

Something that I am particularly proud of is that this PR was written without having to spawn a new task, with the consequence that it avoids issues related to task groups and context variables.

Checklist

  • I understand that this PR may be closed in case there was no previous discussion. (This doesn't apply to typos!)
  • I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
  • I've updated the documentation accordingly.

Fixes #2186

Comment on lines 259 to 279
@pytest.mark.anyio
async def test_asgi_stream_returns_before_waiting_for_body():
start_response_body = anyio.Event()

async def send_response_body_after_event(scope, receive, send):
status = 200
headers = [(b"content-type", b"text/plain")]
await send(
{"type": "http.response.start", "status": status, "headers": headers}
)
await start_response_body.wait()
await send({"type": "http.response.body", "body": b"body", "more_body": False})

async with httpx.AsyncClient(app=send_response_body_after_event) as client:
async with client.stream("GET", "http://www.example.org/") as response:
assert response.status_code == 200
start_response_body.set()
await response.aread()
assert response.text == "body"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test makes sense to me yep.

How about the case where we're streaming the response body, and ensuring that we're able to receive it incrementally? Are we able to test that also?

await start_response_body.wait()
await send({"type": "http.response.body", "body": b"1", "more_body": True})
await keep_going.wait()
await send({"type": "http.response.body", "body": b"2", "more_body": True})
await nearly_there.wait()
await send({"type": "http.response.body", "body": b"3", "more_body": False})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should be able to test that, I will update the PR accordingly (I will also take in account the pending deprecation of the app argument in AsyncClient in the added tests)

@jhominal jhominal force-pushed the asgi-transport-streaming branch 2 times, most recently from 460832c to e249563 Compare January 17, 2024 17:24
@jhominal jhominal force-pushed the asgi-transport-streaming branch from e249563 to 672f459 Compare January 17, 2024 17:25
@jhominal
Copy link
Member Author

The PR was updated, with a new test_asgi_stream_allows_iterative_streaming test, and the added tests do not rely on the app argument in AsyncClient.

@jhominal
Copy link
Member Author

jhominal commented Jan 18, 2024

After working with the version of ASGITransport in this PR, it has come to my attention that this version does not work when streaming a starlette.responses.StreamingResponse object. The reason for that being, that starlette.responses.StreamingResponse puts the send calls in a spawned sub task, and that spawned sub task does not go through the generator in _AwaitableRunner.__call__.

I have an idea in order to fix that, here is how I think it could work:

  • Replace response_started bool with an Event object;
  • Replace body_chunks list with a trio object Channel / anyio memory stream ;
  • Replace the _AwaitableRunner.__call__ until Callable argument with an Awaitable ;
  • In _AwaitableRunner.__call__, instead of yielding the item from the generator directly, the yield self._next_item will "race" with the until Awaitable object, using structured concurrency to run that race;

I would like your opinion on whether the idea mentioned above should be added to the current PR, or be the object of a new PR.

@tomchristie
Copy link
Member

I would like your opinion on whether the idea mentioned above should be added to the current PR, or be the object of a new PR.

Either would be okay with me.

@souliane
Copy link

souliane commented Jun 6, 2024

Hi @jhominal,

[...] this version does not work when streaming a starlette.responses.StreamingResponse object. The reason for that being, that starlette.responses.StreamingResponse puts the send calls in a spawned sub task [...]

Could you please confirm that your current patch will also not work with sse-starlette, for the same reason, because of this:

            task_group.start_soon(wrap, partial(self.stream_response, send))

?

Do you know about some working alternatives? I have seen https://pypi.org/project/async-asgi-testclient/ and https://gist.github.com/richardhundt/17dfccb5c1e253f798999fc2b2417d7e, not sure what to think about it.

Thanks.

@jhominal
Copy link
Member Author

jhominal commented Jun 6, 2024

Hello @souliane

I had been working on and off on this issue for a while, in short:

  1. I had hoped that I would be able to avoid spawning a new task for running the ASGI app, because:
  • a new task means that context variables set in the app cannot be seen by the caller;
  • under structured concurrency I have to think about when and how the task group is shutdown (with the breakages that happen when the starting task is not the same as the ending task);
  1. I was able to avoid spawning a new task when running on asyncio (which is my production environment), but not when running on trio (anything I can imagine writing seems to have race conditions due to violating what I understand of trio invariants);
  2. I think I should just bite that bullet and spawn a task for running the ASGI app, with an option so that people can choose whether to spawn a task (and benefit from streaming response) or not;

@tomchristie
Copy link
Member

anything I can imagine writing seems to have race conditions due to violating what I understand of trio invariants

Yep, that's feasible. I don't really understand if that's inherent to ASGI, or particular to the interfacing a Request/Response API onto ASGI. Shrug.

I think I should just bite that bullet and spawn a task for running the ASGI app

Sure. Does it make sense to have a task group running over the lifespan of the transport? That's then well-bounded.

@zuoc1993
Copy link

Is this PR still ongoing? I encountered the same issue.

@tomchristie
Copy link
Member

@zuoc1993 It is still ongoing yep.

There's possibly a discussion to be had around the Transport API, and the limitations of a request -> response API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ASGITransport does not stream responses.
4 participants